Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to use systemd events #217

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

jordigilh
Copy link
Contributor

@jordigilh jordigilh commented Jul 29, 2022

  • Changes to use systemd events instead of relying on podman events.
  • Refactored systemd functions for clarity.
    - Use fsNotify to capture file system changes in the .config/systemd/user/ directory; whenever a new change is made to this subdirectory, a routine in flotta receives the event and determines if it's an enabled or disabled service or something else. If it's an enabled service, for instance, it will add the service to the list of services to watch in the event listener, which will trigger an event from the dbus to notify the status of the service. It might look cumbersome but it's consistent with systemd:
    • One routine to watch for service files to monitor
    • Another routine to watch for status of those services.
  • Added the String() function to the Observer interface in configuration.go to help listing their names when debuging to display the observers registered instead of their memory references.

The services.json file remains as it is required to identify which service files are linked to a workload. We could get the information from systemd's service unit definition, but I don't want to extend this PR more than it is already, as it is starting to look like a behemoth, when it was supposed to be horse size.

Note: I tried to create an interface layer between dbus and the event listener so I could add unit tests. Unfortunately the initial effort translated into creating extra complexity because the dbus package is not providing interfaces. Suggestions are welcome.

@jordigilh jordigilh changed the title Migrate to use systemd events [WIP] Migrate to use systemd events Jul 29, 2022
@jordigilh
Copy link
Contributor Author

Failing to properly configure the file permissions for volumes directory when run in a container. No issues when run in a VM. Investigating.

@jordigilh jordigilh changed the title [WIP] Migrate to use systemd events Migrate to use systemd events Jul 29, 2022
@jordigilh
Copy link
Contributor Author

Failing to properly configure the file permissions for volumes directory when run in a container. No issues when run in a VM. Investigating.

Fixed in project-flotta/flotta-operator#290 . It is not a showstopper for this PR as it only impacts when running a custom RPM for the end to end tests.

internal/service/systemd.go Outdated Show resolved Hide resolved
func (e *EventListener) Add(workloadName string) {
name := fmt.Sprintf("%s.service", workloadName)
log.Debugf("Adding service for events %s", name)
if !e.set.Contains(name) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case this might happen?
An attempt to add an already created service should be translated to a 'replace', but don't think we should meet this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a safeguard just in case the service was already part of the filter. In theory this should never happen based on the nominal flow. If a service is replaced it will be first removed and then readded by the events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up removing it with the new logic to watch for file system events. The case where the service could be added twice is not possible anymore.

}

func (e *EventListener) Add(workloadName string) {
name := fmt.Sprintf("%s.service", workloadName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls pull fmt.Sprintf("%s.service", workloadName) to its method to be reused by Remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

internal/service/event_listener.go Outdated Show resolved Hide resolved
internal/service/event_listener.go Outdated Show resolved Hide resolved
internal/workload/wrapper.go Outdated Show resolved Hide resolved
@@ -20,12 +20,20 @@ const (
DefaultRestartTimeout = 15
TimerSuffix = ".timer"
ServiceSuffix = ".service"
DefaultNameSeparator = "-"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted. I wonder why no checks flagged this in the PR workflow.


err := svc.Stop()
if err != nil {
log.Errorf("unable to stop service %s:%s", workloadName, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/unable/Unable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if err != nil {
return nil
log.Errorf("unable to remove service from serviceManager %s:%s", workloadName, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/unable/Unable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@eloycoto
Copy link
Collaborator

eloycoto commented Aug 2, 2022

@jordigilh what I mentioned yesterday regarding restarts:

Initialize all as normal, and deploy a workload; you will get the info here:
https://github.com/project-flotta/flotta-device-worker/blob/main/internal/metrics/workload.go#L55

After all the pods are running, you need to restart yggdrasild

systemctl restart yggdrasild

there is no notification that the workloadStarted, so the metrics are not updated at all, and it'll not scrape as it should.

@jordigilh
Copy link
Contributor Author

jordigilh commented Aug 2, 2022

@jordigilh what I mentioned yesterday regarding restarts:

Initialize all as normal, and deploy a workload; you will get the info here: https://github.com/project-flotta/flotta-device-worker/blob/main/internal/metrics/workload.go#L55

After all the pods are running, you need to restart yggdrasild

systemctl restart yggdrasild

there is no notification that the workloadStarted, so the metrics are not updated at all, and it'll not scrape as it should.

That's weird, I'm actually seeing it:

[root@fedora ~]# systemctl restart yggdrasild
[root@fedora ~]# Aug 02 11:14:39 fedora systemd[1]: Stopping yggdrasild.service - yggdrasil daemon...
Aug 02 11:14:39 fedora systemd[1]: yggdrasild.service: Killing process 1098 (device-worker) with signal SIGKILL.
Aug 02 11:14:39 fedora systemd[1]: yggdrasild.service: Deactivated successfully.
Aug 02 11:14:39 fedora systemd[1]: Stopped yggdrasild.service - yggdrasil daemon.
Aug 02 11:14:39 fedora systemd[1]: yggdrasild.service: Consumed 2.286s CPU time.
Aug 02 11:14:39 fedora systemd[1]: Started yggdrasild.service - yggdrasil daemon.
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 starting yggdrasild version 0.2.99
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 cannot start watching '/etc/yggdrasil/tags.toml': lstat /etc/yggdrasil/tags.toml: no such file or directory
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 listening on socket: @yggd-dispatcher-vTDVZH
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 cannot route message  123d34f8-412f-4eba-b718-eddffc010444 to directive: device
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 worker registered: {pid:1225 handler:device addr:@ygg-device-pqOEQv features:map[] detachedContent:false}
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 /usr/libexec/yggdrasil/device-worker: Data directory: /etc/yggdrasil/device
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 /usr/libexec/yggdrasil/device-worker: device config file: /etc/yggdrasil/device/device-config.json
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 /usr/libexec/yggdrasil/device-worker: started targetMetric for workload 'system'
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 /usr/libexec/yggdrasil/device-worker: started targetMetric for workload 'data transfer'
Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 /usr/libexec/yggdrasil/device-worker: Service for workload mylog started

Look at the last line:

Aug 02 11:14:39 fedora yggdrasild[1219]: [yggdrasild] 2022/08/02 11:14:39 /usr/libexec/yggdrasil/device-worker: Service for workload mylog started

Which is generated in this line:
https://github.com/project-flotta/flotta-device-worker/pull/217/files#diff-27f5b4db08ed174b7b18101c679d78c4dcf69db6fd8d6772f6a462f0fd6ae453R115

That's the only workload available for the device:

$> oc get edgedevice 4233c45699b644b79107306e74bccbc5  -ojsonpath='{.status.workloads}' | jq .
[
  {
    "lastTransitionTime": "2022-08-02T15:15:39Z",
    "name": "mylog",
    "phase": "Running"
  }
]

@eloycoto
Copy link
Collaborator

eloycoto commented Aug 2, 2022

maybe the channel is not initialized at that time?

@jordigilh
Copy link
Contributor Author

jordigilh commented Aug 2, 2022

maybe the channel is not initialized at that time?

In that case the message should not have been received by the wrapper. In my case I'm testing with a workload and an edgedevice that has no metrics nor logs, so maybe that's why there are no other messages.
This is the log output when creating the workload in the first place:

Aug 02 11:33:41 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:41 /usr/libexec/yggdrasil/device-worker: updating configuration. New config: {"configuration":{"heartbeat":{"hardware_profile":{},"period_seconds":60},"metrics":{"receiver":{"request_num_samples":30000,"timeout_seconds":10}},"mounts":[]},"device_id":"4233c45699b644b79107306e74bccbc5","version":"206051","workloads":[{"name":"mylog","namespace":"default","specification":"containers:\n- image: docker.io/eloycoto/logexample\n  name: log\n  resources: {}\n"}]}
Aug 02 11:33:41 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:41 /usr/libexec/yggdrasil/device-worker: Old config: {"configuration":{"heartbeat":{"hardware_profile":{},"period_seconds":60},"metrics":{"receiver":{"request_num_samples":30000,"timeout_seconds":10}},"mounts":[]},"device_id":"4233c45699b644b79107306e74bccbc5","version":"205749"}
Aug 02 11:33:41 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:41 /usr/libexec/yggdrasil/device-worker: Creating service for mylog
Aug 02 11:33:41 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:41 /usr/libexec/yggdrasil/device-worker: writing new systemd file for 'container-mylog-log' on '/var/home/flotta/.config/systemd/user/container-mylog-log.service'
Aug 02 11:33:41 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:41 /usr/libexec/yggdrasil/device-worker: writing new systemd file for 'mylog' on '/var/home/flotta/.config/systemd/user/mylog.service'
Aug 02 11:33:42 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:42 /usr/libexec/yggdrasil/device-worker: Adding service for listener for mylog
Aug 02 11:33:42 fedora yggdrasild[3484]: [yggdrasild] 2022/08/02 11:33:42 /usr/libexec/yggdrasil/device-worker: Service for workload mylog started

Notice that there are no additional messages for metrics or logs after Service for workload mylog started

@eloycoto
Copy link
Collaborator

eloycoto commented Aug 2, 2022

This is how I'm testing, and that should be received, but it's not.
https://drive.google.com/file/d/13sfK0L7eACQRhZInzAGfQWtPcKvMHZZ0/view?usp=sharing

@jordigilh
Copy link
Contributor Author

This is how I'm testing, and that should be received, but it's not.

https://drive.google.com/file/d/13sfK0L7eACQRhZInzAGfQWtPcKvMHZZ0/view?usp=sharing

Confirmed. The problem is that the observers are not instantiated before the events are triggered. I will refactor the code to init the observers before the event listener is instantiated

@jordigilh
Copy link
Contributor Author

jordigilh commented Aug 4, 2022

@eloycoto can you try again?. I made it Eloy's fail-proof now :)

@jordigilh
Copy link
Contributor Author

@masayag @tupyy significant refactoring made, so you might want to take another look at the whole thing.

@jordigilh jordigilh force-pushed the test_podman_events branch 7 times, most recently from bb407e0 to 362dcd5 Compare August 4, 2022 20:03
cmd/device-worker/main.go Outdated Show resolved Hide resolved
@jordigilh
Copy link
Contributor Author

jordigilh commented Aug 8, 2022

@masayag @eloycoto I've refactored the code and removed the FS Notify logic. Instead the event listener is now an Observer and taps into the device config as source of truth to be aware of which services to watch events for. Take a look at the last commit if you don't want to be bothered with the rest.

One thing that still bothers me is that we don't have any means to track other enabled user services, in case somehow a new service is added to the user outside of the agent's control.

removed unitSubState = "removed"
)

var ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@jordigilh jordigilh force-pushed the test_podman_events branch 3 times, most recently from d14c973 to c825ed8 Compare August 8, 2022 20:23
@@ -20,7 +20,8 @@ import (
os2 "github.com/project-flotta/flotta-device-worker/internal/os"
registration2 "github.com/project-flotta/flotta-device-worker/internal/registration"
"github.com/project-flotta/flotta-device-worker/internal/server"
workload2 "github.com/project-flotta/flotta-device-worker/internal/workload"
"github.com/project-flotta/flotta-device-worker/internal/service"
workload "github.com/project-flotta/flotta-device-worker/internal/workload"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this workload alias is not needed at all, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, will remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.Infof("Starting DBus event listener")
conn, err := newDbusConnection(UserBus)
if err != nil {
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be this formarted with a message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure what would be the added value. Any suggestion to what to append?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I added this:

fmt.Errorf("error while starting event listener: %v", err)

svcName := DefaultServiceName(wl.Name)
if !e.contains(svcName) {
e.add(svcName)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else conditions shouldn't raise an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.Debugf("Captured DBus event for %s: %v+", name, unit)
n, err := extractWorkloadName(name)
if err != nil {
log.Error(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a wrapper error message here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any suggestions? The message already contains the service name

continue
}
state := translateUnitSubStatus(unit)
log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the level to Info

Copy link
Contributor Author

@jordigilh jordigilh Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be too verbose with a few workloads in place. I had them all in Info and it was dumping too much info. Keep in mind that for every change in the unit's status it will print this message, even when the changes are to be ignored because the new sub-state is not relevant.
I had this enabled earlier and it was dumping too much information.

The changes in state are already captured in here:

log.Infof("Service for workload %s started", event.WorkloadName)

log.Debugf("Sending stop event to observer channel for workload %s", n)
e.eventCh <- &Event{WorkloadName: n, Type: EventStopped}
default:
log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be an error? What cases are missing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few sub states we ignore because they are not relevant to our needs:

  • post-start
  • post-stop
  • pre-start
    ...
    The ones we capture are the ones we want to take action.

@@ -267,7 +262,8 @@ func (s *systemd) Stop() error {
}

func (s *systemd) Enable() error {
conn, err := newDbusConnection(s.Rootless)
log.Debugf("Enabling service %s", s.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling systemd service %v

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with %s

@@ -278,7 +274,8 @@ func (s *systemd) Enable() error {
}

func (s *systemd) Disable() error {
conn, err := newDbusConnection(s.Rootless)
log.Debugf("Disabling service %s", s.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling systemd service

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@@ -15,9 +15,9 @@ import (
"github.com/blang/semver"
"github.com/go-openapi/swag"
"github.com/project-flotta/flotta-device-worker/internal/service"
api "github.com/project-flotta/flotta-device-worker/internal/workload/api"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this alias is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acknowledged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.Infof("Service for workload %s started", event.WorkloadName)
report, err := w.podManager.GetPodReportForPodName(event.WorkloadName)
if err != nil {
log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will workload '%s' as line 364

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report})
}
case service.EventStopped:
log.Infof("Service for workload %s stopped", event.WorkloadName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'%s'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Moved event listener instance and initialization to cmd.go main
* Added a file system watcher to the systemd enabled services directory to be notified of new services from being enabled/disabled so that the event listener can track their status
* Encapsulated the code to listen to service events generated by the systemd event listener so that it is called at the last line of the cmd.go#main function to make sure all observers are ready before start processing the service events
…ceived to avoid potential race condition between file system event and dbus event
@jordigilh
Copy link
Contributor Author

@eloycoto I've addressed some of your comments, there are a few that I've replied. PTAL when you have spare time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants